"""
基于qthread的服务器
直接基于socket。

"""

import json
import time
from typing import List, Tuple, Dict, Union

from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QObject, QThread, QTimer, pyqtSignal
import socket  # 导入 socket 模块
import sys
import logging

from .util import receive, timeit, strip_byte_end

logger = logging.getLogger(__name__)

g_conn_pool = []


def parse_splicing_packets(packet_bytes: bytes) -> List[bytes]:
    return packet_bytes.split(b'PMEND')


class MessageWork(QObject):
    signal_message_received = pyqtSignal(socket.socket, bytes)
    signal_work_finished = pyqtSignal()
    signal_socket_closed = pyqtSignal(socket.socket)

    def __init__(self, server: 'PMGServer'):
        super(MessageWork, self).__init__()
        self.server = server

    def work(self):
        self.message_handle(client=self.client)
        self.signal_work_finished.emit()

    def handle_packet(self, client: socket.socket, packet: bytes):
        try:
            dic = json.loads(packet)
            func = self.server.dispatcher_dic[dic['method']]
            args = dic['params']
            client.sendall(func(*args).encode('utf-8') + b'PMEND')
        except:
            client.close()
            self.signal_socket_closed.emit(client)
            logger.warning(repr(self.server.long_conn_sockets))
            return b''

    def message_handle(self, client):
        """
        消息处理
        """
        while True:
            logger.info('client waiting!' + repr(client))
            logger.info('conn_pool_length:%d ,pool is: %s' % (len(self.server.long_conn_sockets.items()),
                                                              repr(self.server.long_conn_sockets)))
            try:
                bytes = receive(client)
                for packet in parse_splicing_packets(bytes):
                    self.handle_packet(client, packet)
            except:
                import traceback
                traceback.print_exc()
                client.close()
                # 删除连接
                self.signal_socket_closed.emit(client)
                logger.info("客户端%s下线了。" % repr(client))
                break
            if len(bytes) == 0:
                client.close()
                self.signal_socket_closed.emit(client)
                logger.info("客户端%s下线了。" % repr(client))
                break


class LoopWork(QObject):
    def __init__(self, server_obj: 'PMGServer', server_socket: socket.socket):
        super().__init__()
        self.server_socket = server_socket
        self.server_obj = server_obj
        self.threads = []

    def work(self):
        self.accept_client(self.server_socket)

    def dict_to_byte_msg(self, dic: Dict) -> bytes:
        b = (json.dumps(dic) + 'PMEND').encode('utf-8')
        return b

    def send_dict(self, sock: socket.socket, dic: Dict):
        sock.sendall(self.dict_to_byte_msg(dic))

    def generate_response_template(self) -> Dict[str, Union[str, int, Dict, List, float]]:
        payload = {'message': 'succeeded', 'content': '', 'timestamp': time.time()}
        return payload

    def accept_client(self, server_socket):
        """
        接收新连接.
        请求长连接的时候就把连接放到连接池中
        请求短链接的时候就直接处理。
        """
        while True:
            try:
                sock, _ = server_socket.accept()  # 阻塞，等待客户端连接
                # 加入连接池
                conn_message = sock.recv(1024)
                conn_message = strip_byte_end(conn_message)
                try:
                    message_dic = json.loads(conn_message)
                    if message_dic.get('method') == 'start_long_connection':
                        name = message_dic.get('name')
                        if name != None:
                            if message_dic.get('name') not in self.server_obj.long_conn_sockets.keys():
                                self.server_obj.long_conn_sockets[name] = sock
                                response = self.generate_response_template()
                                response['message'] = 'succeeded'
                                response['content'] = 'socket connected'
                                self.send_dict(sock, response)
                            else:
                                logger.info('socket named \'%s\' is already connected!' % name)
                                response = self.generate_response_template()
                                response['message'] = 'failed'
                                response['content'] = 'socket name \'%s\' already connected' % name
                                self.send_dict(sock, response)
                        else:
                            response = self.generate_response_template()
                            response['message'] = 'failed'
                            response['content'] = 'socket name is None'
                            self.send_dict(sock, response)
                            logger.info('name is None, invalid request content!')
                    else:  # 如果请求的不是一个长连接，就直接进行处理，

                        response = self.generate_response_template()
                        response['message'] = 'succeeded'
                        response['content'] = 'connection established'
                        self.send_dict(sock, response)
                        b = receive(sock)
                        logger.info(repr(b))
                        self.handle_packet(sock, b)
                except:
                    import traceback
                    traceback.print_exc()
                    logger.info('failed to decode json:%s' % str(conn_message))
                    response = self.generate_response_template()
                    response['message'] = 'failed'
                    response['content'] = 'invalid request json:\n%s' % str(conn_message)
                    self.send_dict(sock, response)
            except:
                import traceback
                traceback.print_exc()
                break

    def handle_packet(self, client: socket.socket, packet: bytes) -> None:
        try:
            try:
                dic = json.loads(packet)
                func = self.server_obj.dispatcher_dic[dic['method']]
                args = dic['params']
                vy = func(*args).encode('utf-8') + b'PMEND'
                client.sendall(vy)
            except json.decoder.JSONDecodeError:
                vy = json.dumps({'message': 'failed'}).encode('utf-8') + b'PMEND'
                client.sendall(vy)

        except:
            import traceback
            traceback.print_exc()
            client.close()

            logger.warning(repr(g_conn_pool))


class PMGServer(QObject):
    def __init__(self, address=Tuple[str, int], parent=None):
        super().__init__(parent)
        self.dispatcher_dic = {}
        self.long_conn_sockets: Dict[str, socket.socket] = {}
        self.socket = self.init_socket(address)
        self.server_loop_thread = QThread()
        self.loop_worker = LoopWork(self, self.socket)
        self.loop_worker.moveToThread(self.server_loop_thread)

        self.server_loop_thread.started.connect(self.loop_worker.work)
        self.server_loop_thread.start()

    def init_socket(self, address):
        """
        初始化套接字
        """
        g_socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建 socket 对象
        # g_socket_server.settimeout(5)
        # g_socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        g_socket_server.bind(address)
        g_socket_server.listen(5)  # 最大等待数（有很多人理解为最大连接数，其实是错误的）
        logger.info("服务端已启动，等待客户端连接...")
        return g_socket_server

    def broadcast_message(self, message_dic: dict = None):
        """
        广播信息
        message:传递信息
        # 'DATA_CHANGED'
        # 'SHUT_DOWN'
        :param clients:
        :return:
        """

        if message_dic is None:
            message_dic = {'name': 'broadcast', 'message': 'Are you alive?'}
        ids = []
        logger.info('broadcast message:' + repr(message_dic))
        for k in self.long_conn_sockets.keys():
            try:
                self.long_conn_sockets[k].sendall(json.dumps(message_dic).encode('utf8') + b'PMEND')
            except ConnectionResetError:
                ids.append(k)
                logger.info('Connection \'%s\' closed!' % k)
            except:
                import traceback
                traceback.print_exc()
                ids.append(k)
        logger.info('died connections:' + repr(ids))
        for not_used_socket_name in ids:
            sock = self.long_conn_sockets.pop(not_used_socket_name)
            sock.close()


if __name__ == '__main__':
    app = QApplication(sys.argv)
    ADDRESS = ('127.0.0.1', 12306)  # 绑定地址
    s = PMGServer(ADDRESS)

    qtimer = QTimer()
    qtimer.start(2000)
    qtimer.timeout.connect(lambda: s.broadcast_message(None))
    sys.exit(app.exec_())
